/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_SQL_PLAN_CACHE_OB_PLAN_SET_
#define OCEANBASE_SQL_PLAN_CACHE_OB_PLAN_SET_

#include "lib/net/ob_addr.h"
#include "lib/container/ob_iarray.h"
#include "lib/container/ob_2d_array.h"
#include "share/partition_table/ob_partition_location_cache.h"
#include "sql/optimizer/ob_table_location.h"
#include "sql/optimizer/ob_table_partition_info.h"
#include "sql/executor/ob_task_executor_ctx.h"
#include "sql/plan_cache/ob_plan_cache_util.h"
#include "sql/plan_cache/ob_dist_plans.h"
#include "share/schema/ob_schema_struct.h"
#include "lib/hash/ob_hashset.h"

namespace test {
class TestPlanSet_basic_Test;
}
namespace oceanbase {
namespace share {
namespace schema {
class ObSchemaGetterGuard;
}
}  // namespace share
namespace sql {
class ObPlanCacheValue;
class ObPlanCache;
class ObCacheObject;
class ObPhysicalPlan;
class ObExecContext;
class ObSqlCtx;

typedef ObFixedArray<common::ObObjMeta, common::ObIAllocator> UserSessionVarMetaArray;
typedef ObFixedArray<ObPCConstParamInfo, common::ObIAllocator> ConstParamConstraint;
typedef ObFixedArray<ObPCParamEqualInfo, common::ObIAllocator> EqualParamConstraint;

enum ObPlanSetType {
  PST_SQL_CRSR = 0,  // sql plan
  PST_PRCD = 1,      // store procedure, store function, package
  PST_MAX
};

struct HashKey {
  inline bool operator==(const HashKey& other) const
  {
    bool bret = true;
    if (rowkey_.count() != other.rowkey_.count()) {
      bret = false;
    }
    for (int64_t i = 0; bret && i < rowkey_.count(); ++i) {
      if (rowkey_.at(i) != other.rowkey_.at(i)) {
        bret = false;
      }
    }
    return bret;
  }
  inline uint64_t hash() const
  {
    uint64_t hash_ret = 0;
    for (int64_t i = 0; i < rowkey_.count(); ++i) {
      hash_ret = rowkey_.at(i).hash(hash_ret);
    }
    return hash_ret;
  }
  void reuse()
  {
    rowkey_.reuse();
  }
  common::ObSEArray<ObObj, 4> rowkey_;
  TO_STRING_KV(K_(rowkey));
};

typedef common::hash::ObHashSet<HashKey, common::hash::NoPthreadDefendMode> UniqueHashSet;

// ObPlanSet is a set of ObPhysicalPlans with same PlanMetaInfo
class ObPlanSet : public common::ObDLinkBase<ObPlanSet> {
  friend class ObPhyLocationGetter;

  public:
  explicit ObPlanSet(ObPlanSetType type)
      : alloc_(common::ObNewModIds::OB_SQL_PLAN_CACHE),
        plan_cache_value_(NULL),
        type_(type),
        params_info_(ObWrapperAllocator(alloc_)),
        ref_lock_(common::ObLatchIds::PLAN_SET_LOCK),
        stmt_type_(stmt::T_NONE),
        fetch_cur_time_(false),
        is_ignore_stmt_(false),
        outline_param_idx_(common::OB_INVALID_INDEX),
        related_user_var_names_(alloc_),
        related_user_sess_var_metas_(alloc_),
        all_possible_const_param_constraints_(alloc_),
        all_plan_const_param_constraints_(alloc_),
        multi_stmt_rowkey_pos_(alloc_),
        pre_cal_expr_handler_(NULL)
  {}
  virtual ~ObPlanSet();

  public:
  int match_params_info(const ParamStore* params, ObPlanCacheCtx& pc_ctx, int64_t outline_param_idx, bool& is_same);
  int match_params_info(
      const common::Ob2DArray<ObParamInfo, common::OB_MALLOC_BIG_BLOCK_SIZE, common::ObWrapperAllocator, false>& infos,
      int64_t outline_param_idx, const ObPlanCacheCtx& pc_ctx, bool& is_same);
  int match_param_info(const ObParamInfo& param_info, const ObObjParam& param, const bool is_varchar_eq_char,
      const bool& is_batched_multi_stmt, bool& is_same) const;
  int match_param_bool_value(
      const ObParamInfo& param_info, const ObObjParam& param, const bool& is_batched_multi_stmt, bool& is_same) const;
  static int match_multi_stmt_info(
      const ParamStore& params, const common::ObIArray<int64_t>& multi_stmt_rowkey_pos, bool& is_match);
  ObPlanSetType get_type()
  {
    return type_;
  }
  void set_type(ObPlanSetType plan_set_type)
  {
    type_ = plan_set_type;
  }
  static ObPlanSetType get_plan_set_type_by_cache_obj_type(ObCacheObjType co_type);
  void set_plan_cache_value(ObPlanCacheValue* pc_value)
  {
    plan_cache_value_ = pc_value;
  }
  ObPlanCacheValue* get_plan_cache_value()
  {
    return plan_cache_value_;
  }
  ObPlanCache* get_plan_cache() const;
  virtual int add_cache_obj(ObCacheObject& cache_object, ObPlanCacheCtx& pc_ctx, int64_t ol_param_idx) = 0;
  virtual int select_plan(
      share::ObIPartitionLocationCache* location_cache, ObPlanCacheCtx& pc_ctx, ObCacheObject*& plan) = 0;
  virtual void remove_all_plan() = 0;
  virtual int64_t get_mem_size() = 0;
  virtual int remove_plan_stat()
  {
    return common::OB_SUCCESS;
  }
  virtual void reset();
  virtual int init_new_set(const ObPlanCacheCtx& pc_ctx, const ObCacheObject& cache_obj, int64_t outline_param_idx,
      common::ObIAllocator* pc_alloc_);
  /*  static int check_array_bind_same_bool_param(*/
  // const Ob2DArray<ObParamInfo,
  // OB_MALLOC_BIG_BLOCK_SIZE,
  // ObWrapperAllocator, false> &param_infos,
  // const ParamStore & param_store,
  /*bool &same_bool_param);*/
  inline bool is_multi_stmt_plan() const
  {
    return !multi_stmt_rowkey_pos_.empty();
  }

  private:
  bool is_match_outline_param(int64_t param_idx)
  {
    return outline_param_idx_ == param_idx;
  }

  /**
   * @brief set const param constraints
   *
   */
  int set_const_param_constraint(
      common::ObIArray<ObPCConstParamInfo>& const_param_constraint, const bool is_all_constraint);

  int set_equal_param_constraint(common::ObIArray<ObPCParamEqualInfo>& equal_param_constraint);

  /**
   * @brief Match const param constraint.
   * If all_plan_const_param_constraints_ is not empty, check wether the constraints is mached and return the result.
   * If all_plan_const_param_constraints_ is empty, but any of the constraints in all_possible_const_param_constraints_
   * is matched, the is_matched is false (new plan shoule be generated).
   *
   * @param params Const Params about to match
   * @retval is_matched Matching result
   */
  int match_constraint(const ParamStore& params, bool& is_matched);

  int init_pre_calc_exprs(
      const ObCacheObject& cache_obj, const bool is_use_static_engine, common::ObIAllocator* pc_alloc_);

  int pre_calc_exprs(ObExecContext& exec_ctx);

  static int check_vector_param_same_bool(const ObObjParam& param_obj, bool& first_val, bool& is_same);

  DISALLOW_COPY_AND_ASSIGN(ObPlanSet);
  friend class ::test::TestPlanSet_basic_Test;

  protected:
  common::ObArenaAllocator alloc_;
  ObPlanCacheValue* plan_cache_value_;
  ObPlanSetType type_;
  common::Ob2DArray<ObParamInfo, common::OB_MALLOC_BIG_BLOCK_SIZE, common::ObWrapperAllocator, false> params_info_;
  common::SpinRWLock ref_lock_;
  stmt::StmtType stmt_type_;
  bool fetch_cur_time_;
  bool is_ignore_stmt_;
  int64_t outline_param_idx_;
  // related user session var names
  common::ObFixedArray<common::ObString, common::ObIAllocator> related_user_var_names_;
  UserSessionVarMetaArray related_user_sess_var_metas_;
  ConstParamConstraint all_possible_const_param_constraints_;
  ConstParamConstraint all_plan_const_param_constraints_;
  EqualParamConstraint all_equal_param_constraints_;
  ObSEArray<uint64_t, 4> trans_happened_route_;
  // record which T_CHAR const matches T_VARCHAR
  common::ObBitSet<> change_char_index_;
  // maintain the rowkey position for multi_stmt
  common::ObFixedArray<int64_t, common::ObIAllocator> multi_stmt_rowkey_pos_;
  // pre calculable expression list handler.
  PreCalcExprHandler* pre_cal_expr_handler_;
};

struct TablePart {
  TablePart() : table_id_(OB_INVALID_ID), ref_table_id_(OB_INVALID_ID), part_id_(OB_INVALID_ID){};

  TablePart(int64_t tid, int64_t ref_tid, int64_t pid, ObPGKey pg_key)
      : table_id_(tid), ref_table_id_(ref_tid), part_id_(pid), pg_key_(pg_key)
  {}

  int64_t table_id_;
  int64_t ref_table_id_;
  int64_t part_id_;
  ObPGKey pg_key_;
  TO_STRING_KV(K_(table_id), K_(ref_table_id), K_(part_id), K_(pg_key));
};

class ObSqlPlanSet : public ObPlanSet {
  public:
  ObSqlPlanSet()
      : ObPlanSet(PST_SQL_CRSR),
        is_all_non_partition_(true),
        table_locations_(),  // use default behavior for memory release
        array_binding_plan_(),
        local_plan_(NULL),
        remote_plan_(NULL),
        dist_plans_(),
        need_try_plan_(0),
        has_duplicate_table_(false),
        // has_array_binding_(false),
        table_location_fast_calc_(true),
        is_contain_virtual_table_(false),
        enable_inner_part_parallel_exec_(false)
  {}

  virtual ~ObSqlPlanSet()
  {}

  public:
  virtual int add_cache_obj(ObCacheObject& cache_object, ObPlanCacheCtx& pc_ctx, int64_t ol_param_idx) override;
  virtual int select_plan(
      share::ObIPartitionLocationCache* location_cache, ObPlanCacheCtx& pc_ctx, ObCacheObject*& cache_obj) override;
  virtual int remove_plan_stat() override;
  virtual void remove_all_plan() override;
  virtual int64_t get_mem_size() override;
  virtual void reset() override;
  virtual int init_new_set(const ObPlanCacheCtx& pc_ctx, const ObCacheObject& cache_obj, int64_t outline_param_idx,
      common::ObIAllocator* pc_alloc_) override;
  // calculate phy_plan type:
  // @param [in]  phy_locations
  // @param [out] plan_type
  static int calc_phy_plan_type_v2(const common::ObIArray<ObPhyTableLocationInfo>& phy_locations,
      ObPhyPlanType& plan_type, bool need_check_on_same_server);
  static int calc_phy_plan_type(
      const common::ObIArray<ObPhyTableLocationInfo>& phy_locations, ObPhyPlanType& plan_type);
  inline bool has_duplicate_table() const
  {
    return has_duplicate_table_;
  }
  // inline bool has_array_binding() const { return has_array_binding_; }
  inline bool enable_inner_part_parallel() const
  {
    return enable_inner_part_parallel_exec_;
  }

  static int get_table_array_binding_parition_ids(const ObTableLocation& table_location,
      const ObSQLSessionInfo* session, const ParamStore& param_store, share::schema::ObSchemaGetterGuard* schema_guard,
      ObExecContext& exec_ctx, ObIArray<int64_t>& partition_ids);

  private:
  enum {
    TRY_PLAN_LATE_MAT = 1,
    TRY_PLAN_OR_EXPAND = 1 << 1,
    TRY_PLAN_UNCERTAIN = 1 << 2,
    TRY_PLAN_GLOBAL_INDEX = 1 << 3,
  };
  int get_plan_normal(share::ObIPartitionLocationCache& location_cache, ObPlanCacheCtx& pc_ctx, ObPhysicalPlan*& plan);

  int get_local_plan_direct(ObPlanCacheCtx& pc_ctx, bool& is_direct_local_plan, ObPhysicalPlan*& plan);

  int gen_partition_key(common::ObIArray<TablePart>& table_parts, ObPartitionKey& partition_key);

  int replace_partition_id(ObPhyTableLocationIArray& phy_locations, common::ObIArray<TablePart>& table_parts);

  int get_partition_id(ObIArray<TablePart>& table_parts, int64_t table_id, int64_t& partition_id, ObPGKey& pg_key);

  int get_partition_key(const ObIArray<ObPhyTableLocationInfo>& phy_location_infos, const ObAddr& addr,
      const ObPhyPlanType& plan_type, ObPartitionKey& partition_key);

  int add_plan(ObPhysicalPlan& plan, ObPlanCacheCtx& pc_ctx, int64_t outline_param_idx);

  int get_plan_special(share::ObIPartitionLocationCache& location_cache, ObPlanCacheCtx& pc_ctx, ObPhysicalPlan*& plan);

  int get_phy_locations(const ObIArray<ObTableLocation>& table_locations, ObPlanCacheCtx& pc_ctx,
      share::ObIPartitionLocationCache& location_cache, ObIArray<ObPhyTableLocationInfo>& phy_location_infos,
      bool& need_check_on_same_server);

  int get_phy_locations(const ObTablePartitionInfoArray& partition_infos, ObIArray<ObPhyTableLocation>& phy_locations,
      ObIArray<ObPhyTableLocationInfo>& phy_location_infos);

  int set_concurrent_degree(int64_t outline_param_idx, ObPhysicalPlan& plan);

  int get_plan_type(const ObIArray<ObTableLocation>& table_locations, const bool is_contain_uncertain_op,
      ObPlanCacheCtx& pc_ctx, share::ObIPartitionLocationCache& location_cache,
      ObIArray<ObPhyTableLocationInfo>& phy_location_infos, ObPhyPlanType& plan_type);

  int init_phy_location(int64_t count)
  {
    return local_phy_locations_.reserve(count);
  }

  static int is_partition_in_same_server(
      const ObIArray<ObPhyTableLocationInfo>& phy_location_infos, bool& is_same, ObAddr& first_addr);

  static int extend_param_store(
      const ParamStore& params, common::ObIAllocator& allocator, ObIArray<ParamStore*>& expanded_params);

  int get_array_bind_plan_type(ObPlanCacheCtx& pc_ctx, share::ObIPartitionLocationCache& location_cache,
      ObPhyPlanType& plan_type, ObIArray<ObSEArray<int64_t, 1024>>& dup_partition_idss);

  int get_array_bind_partition_ids(ObExecContext& exec_ctx, share::schema::ObSchemaGetterGuard* schema_guard,
      const ObSQLSessionInfo* session, ObIArray<ObSEArray<int64_t, 1024>>& dup_partition_idss);

  int get_array_bind_phy_location(const ObPlanCacheCtx& pc_ctx, const ObIArray<ObSEArray<int64_t, 4>>& partition_idss,
      ObExecContext& exec_ctx, share::ObIPartitionLocationCache& location_cache,
      ObIArray<ObPhyTableLocationInfo>& phy_location_infos, bool& need_check_on_same_server);

  int init_fast_calc_part_ids_info();

  int can_fast_calc_part_ids(const ParamStore& params, bool& fast_calc) const;

  int fast_get_array_bind_partition_ids(ObExecContext& exec_ctx, share::schema::ObSchemaGetterGuard* schema_guard,
      const ObSQLSessionInfo* session, ObIArray<ObSEArray<int64_t, 1024>>& dup_partition_idss);

  int normal_get_array_bind_partition_ids(ObExecContext& exec_ctx, share::schema::ObSchemaGetterGuard* schema_guard,
      const ObSQLSessionInfo* session, ObIArray<ObSEArray<int64_t, 1024>>& dup_partition_idss);

  bool is_local_plan_opt_allowed(int last_retry_err);
  int check_partition_status(const ObPartitionKey& pkey, bool& is_leader);

  private:
  bool is_all_non_partition_;  // if plan is with non-part tables
  common::ObSEArray<ObTableLocation, 4> table_locations_;
  common::ObString params_info_str_;
  // used for array binding, only local plan
  ObPhysicalPlan* array_binding_plan_;
  ObPhysicalPlan* local_plan_;
  ObPhysicalPlan* remote_plan_;
  ObDistPlans dist_plans_;
  // for direct get local plan
  ObPhyTableLocationFixedArray local_phy_locations_;
  ObLoctionSensitiveHint loc_sensitive_hint_;
  ObPartitionKey partition_key_;

  // scenario: or expansion, late materialization, global index
  // whoes plan_set's table location is different with plan's table location
  // must retrieve table location from plan and calculate phy partition info
  int64_t need_try_plan_;
  bool has_duplicate_table_;
  // can fast calc part ids for array binding?
  bool table_location_fast_calc_;
  ObSEArray<int64_t, 4> part_param_idxs_;
  // skip local plan test if it is virtual table
  bool is_contain_virtual_table_;
  // if px dop > 1
  bool enable_inner_part_parallel_exec_;
};

class ObPLPlanSet : public ObPlanSet {
  public:
  ObPLPlanSet() : ObPlanSet(PST_PRCD), pl_obj_(NULL)
  {}
  virtual ~ObPLPlanSet()
  {}

  public:
  virtual int add_cache_obj(ObCacheObject& cache_object, ObPlanCacheCtx& pc_ctx, int64_t ol_param_idx) override;
  virtual int select_plan(
      share::ObIPartitionLocationCache* location_cache, ObPlanCacheCtx& pc_ctx, ObCacheObject*& cache_obj) override;
  virtual int remove_plan_stat() override;
  virtual void remove_all_plan() override;
  virtual int64_t get_mem_size() override;
  virtual void reset() override;

  private:
  sql::ObCacheObject* pl_obj_;
};

inline ObPlanSetType ObPlanSet::get_plan_set_type_by_cache_obj_type(ObCacheObjType co_type)
{
  ObPlanSetType ret = PST_MAX;
  switch (co_type) {
    case T_CO_SQL_CRSR:
      ret = PST_SQL_CRSR;
      break;
    case T_CO_PRCR:
    case T_CO_SFC:
    case T_CO_PKG:
    case T_CO_ANON:
      ret = PST_PRCD;
      break;
    default:
      break;
  }
  return ret;
}
}  // namespace sql
}  // namespace oceanbase
#endif
